# Copyright (C) 2011 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#    * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#    * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections
import logging
import os
import six

from blinkpy.common.system.executive import ScriptError

from six import StringIO

_log = logging.getLogger(__name__)


class MockProcess(object):
    def __init__(self, stdout='MOCK STDOUT\n', stderr='', returncode=0):
        self.pid = 42
        self.stdout = StringIO(stdout)
        self.stderr = StringIO(stderr)
        self.stdin = StringIO()
        self.returncode = returncode

    def wait(self):
        return

    def poll(self):
        # Consider the process completed when all the stdout and stderr has been read.
        if (len(self.stdout.getvalue()) != self.stdout.tell()
                or len(self.stderr.getvalue()) != self.stderr.tell()):
            return None
        return self.returncode

    def communicate(self, *_):
        return (self.stdout.getvalue(), self.stderr.getvalue())

    def kill(self):
        return

    def terminate(self):
        return


MockCall = collections.namedtuple('MockCall', ('args', 'kwargs'))


class MockExecutive(object):
    PIPE = 'MOCK PIPE'
    STDOUT = 'MOCK STDOUT'
    DEVNULL = 'MOCK_DEVNULL'

    @staticmethod
    def ignore_error(error):
        pass

    def __init__(self,
                 should_log=False,
                 should_throw=False,
                 output='MOCK output of child process',
                 stderr='',
                 exit_code=0,
                 exception=None,
                 run_command_fn=None,
                 proc=None):
        self._should_log = should_log
        self._should_throw = should_throw
        # FIXME: Once executive wraps os.getpid() we can just use a static pid for "this" process.
        self._running_pids = {'run_blinkpy_tests.py': os.getpid()}
        self._output = output
        self._stderr = stderr
        self._exit_code = exit_code
        self._exception = exception
        self._run_command_fn = run_command_fn
        self._proc = proc
        self.full_calls = []

    def _append_call(self, args, **kwargs):
        self.full_calls.append(MockCall(args=args, kwargs=kwargs))

    def check_running_pid(self, pid):
        return pid in list(self._running_pids.values())

    def running_pids(self, process_name_filter):
        running_pids = []
        for process_name, process_pid in self._running_pids.items():
            if process_name_filter(process_name):
                running_pids.append(process_pid)

        _log.info('MOCK running_pids: %s', running_pids)
        return running_pids

    def command_for_printing(self, args):
        string_args = list(map(six.text_type, args))
        return ' '.join(string_args)

    # The argument list should match Executive.run_command, even if
    # some arguments are not used. pylint: disable=unused-argument
    def run_command(
            self,
            args,
            cwd=None,
            env=None,
            input=None,  # pylint: disable=redefined-builtin
            timeout_seconds=None,
            error_handler=None,
            return_exit_code=False,
            return_stderr=True,
            ignore_stderr=False,
            decode_output=True,
            debug_logging=True):
        self._append_call(args, cwd=cwd, input=input, env=env)

        assert isinstance(args, list) or isinstance(args, tuple)

        if self._should_log:
            env_string = ''
            if env:
                env_string = ', env=%s' % env
            input_string = ''
            if input:
                input_string = ', input=%s' % input
            _log.info('MOCK run_command: %s, cwd=%s%s%s', args, cwd,
                      env_string, input_string)

        if self._exception:
            raise self._exception  # pylint: disable=raising-bad-type
        if self._should_throw:
            raise ScriptError('MOCK ScriptError',
                              output=self._output,
                              exit_code=self._exit_code)

        if self._run_command_fn:
            return self._run_command_fn(args)

        if return_exit_code:
            return self._exit_code

        if self._exit_code and error_handler:
            script_error = ScriptError(
                script_args=args,
                exit_code=self._exit_code,
                output=self._output)
            error_handler(script_error)

        output = self._output
        if return_stderr:
            output += self._stderr
        if decode_output and not isinstance(output, six.text_type):
            output = output.decode('utf-8')

        return output

    def cpu_count(self):
        return 2

    def kill_process(self, pid, kill_tree=True):
        pass

    def interrupt(self, pid):
        pass

    def popen(self, args, cwd=None, env=None, **_):
        assert all(isinstance(arg, six.string_types) for arg in args)
        self._append_call(args, cwd=cwd, env=env)
        if self._should_log:
            cwd_string = ''
            if cwd:
                cwd_string = ', cwd=%s' % cwd
            env_string = ''
            if env:
                env_string = ', env=%s' % env
            _log.info('MOCK popen: %s%s%s', args, cwd_string, env_string)
        if not self._proc:
            self._proc = MockProcess(
                stdout=self._output,
                stderr=self._stderr,
                returncode=self._exit_code)
        return self._proc

    def call(self, args, **_):
        assert all(isinstance(arg, six.string_types) for arg in args)
        self._append_call(args)
        _log.info('Mock call: %s', args)

    def run_in_parallel(self, commands):
        assert len(commands)

        num_previous_calls = len(self.full_calls)
        command_outputs = []
        for cmd_line, cwd in commands:
            assert all(isinstance(arg, six.string_types) for arg in cmd_line)
            command_outputs.append(
                [0, self.run_command(cmd_line, cwd=cwd), ''])

        new_calls = self.full_calls[num_previous_calls:]
        self.full_calls = self.full_calls[:num_previous_calls]
        self.full_calls.append(new_calls)
        return command_outputs

    def map(self, thunk, arglist, processes=None):
        return map(thunk, arglist)

    @property
    def calls(self):
        # TODO(crbug.com/718456): Make self.full_calls always be an array of
        # arrays of MockCalls, rather than a union type, and possibly remove
        # this property in favor of direct "full_calls" access in unit tests.
        def get_args(v):
            if isinstance(v, list):
                return [get_args(e) for e in v]
            elif isinstance(v, MockCall):
                return v.args
            else:
                return TypeError(
                    'Unknown full_calls type: %s' % (type(v).__name__, ))

        return get_args(self.full_calls)


def mock_git_commands(vals, strict=False):
    # TODO(robertma): Support optional look-up by arguments.
    def run_fn(args):
        sub_command = args[1]
        if strict and sub_command not in vals:
            raise AssertionError('{} not found in sub-command list {}'.format(
                sub_command, vals))
        return vals.get(sub_command, '')

    return MockExecutive(run_command_fn=run_fn)
